e073db33184a7c7b5dc5af117e917ac03f09780b,cdap-notifications/src/test/java/co/cask/cdap/notifications/NotificationTest.java,NotificationTest,onePublisherOneSubscriberTest,#,142
Before Change
Assert.assertTrue(feedClient.createFeed(FEED1));
try {
final NotificationClient.Publisher<String> publisher = getNotificationClient().createPublisher(FEED1);
try {
// Create a subscribing process
NotificationClient.Subscriber subscriber = getNotificationClient().createSubscriber();
final AtomicInteger receiveCount = new AtomicInteger(0);
final AtomicBoolean assertionOk = new AtomicBoolean(true);
subscriber.add(FEED1, new NotificationHandler<String>() {
@Override
public Type getNotificationFeedType() {
return String.class;
}
@Override
public void processNotification(String notification, NotificationContext notificationContext) {
LOG.debug("Received notification payload: {}", notification);
try {
Assert.assertEquals("fake-payload-" + receiveCount.get(), notification);
receiveCount.incrementAndGet();
} catch (Throwable t) {
assertionOk.set(false);
Throwables.propagate(t);
}
}
});
Cancellable cancellable = subscriber.consume();
// Runnable to publish notifications on behalf of the publisher entity
Runnable publisherRunnable = new Runnable() {
After Change
final AtomicInteger receiveCount = new AtomicInteger(0);
final AtomicBoolean assertionOk = new AtomicBoolean(true);
Cancellable cancellable = notificationService.subscribe(FEED1, new NotificationHandler<String>() {
@Override
public Type getNotificationFeedType() {
return String.class;
}
@Override
public void processNotification(String notification, NotificationContext notificationContext) {
LOG.debug("Received notification payload: {}", notification);
try {
Assert.assertEquals("fake-payload-" + receiveCount.get(), notification);
receiveCount.incrementAndGet();
} catch (Throwable t) {
assertionOk.set(false);
Throwables.propagate(t);
}
}
});
// Runnable to publish notifications on behalf of the publisher entity
Runnable publisherRunnable = new Runnable() {